import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;

public class MyCombinableGroupReducer implements
        GroupReduceFunction<Tuple2<String, Integer>, String>,
        GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
{


//--------------------------覆盖reduce算子---------------------------
    @Override
    public void reduce(Iterable<Tuple2<String, Integer>> in,
                       Collector<String> out) {

        String key = null;
        int sum = 0;

        for (Tuple2<String, Integer> curr : in) {
            key = curr.f0;
            sum += curr.f1;
        }
        // concat key and sum and emit
        out.collect(key + "-" + sum);
    }



//--------------------------覆盖combine算子---------------------------
    @Override
    public void combine(Iterable<Tuple2<String, Integer>> in,
                        Collector<Tuple2<String, Integer>> out)
    {
        String key = null;
        int sum = 0;

        for (Tuple2<String, Integer> curr : in)
        {
            key = curr.f0;
            sum += curr.f1;
        }
        // emit tuple with key and sum
        out.collect(new Tuple2<>(key, sum));
    }



}